/*
 *  +----------------------------------------------------------------------
 *  | sophon [ A FAST GAME FRAMEWORK ]
 *  +----------------------------------------------------------------------
 *  | Copyright (c) 2023-2029 All rights reserved.
 *  +----------------------------------------------------------------------
 *  | Licensed ( http:www.apache.org/licenses/LICENSE-2.0 )
 *  +----------------------------------------------------------------------
 *  | Author: jqiris <1920624985@qq.com>
 *  +----------------------------------------------------------------------
 */

use std::time::Duration;

use conhash::ConsistentHash;
use dashmap::*;
use etcd_client::{ConnectOptions, WatchOptions, *};

use crate::{discover::*, errors::DiscoverError, hash::HashNode};

pub struct EtcdDiscoverer {
    endpoints: Vec<String>,
    config: EtcdOptions,
    client: Client,
    server_map: DashMap<String, Server>,
    server_type_map: DashMap<String, ServerTypeItem>,
    server_event_handlers: Vec<ServerHander>,
    data_event_handlers: Vec<DataHander>,
}

impl EtcdDiscoverer {
    pub async fn new(endpoints: Vec<String>, options: Option<EtcdOptions>) -> Self {
        let mut config = EtcdOptions::new();
        if options.is_some() {
            config = options.unwrap();
        }
        let client = Client::connect(&endpoints, config.options.to_owned())
            .await
            .unwrap();
        Self {
            endpoints: endpoints.clone(),
            config,
            client,
            server_map: DashMap::new(),
            server_type_map: DashMap::new(),
            server_event_handlers: Vec::new(),
            data_event_handlers: Vec::new(),
        }
    }
    fn server_key(&self, server: &Server) -> String {
        format!("{}{}", self.config.server_prefix, reg_server_item(server))
    }

    //打印服务器
    fn dump_servers(&self) {
        info!("{}", "#####################################DUMP SERVERS BEGIN#################################");
        let server_type_map = &self.server_type_map;
        for item in server_type_map.iter() {
            info!("{}","------------------------------------------------------------------------------------");
            for server in item.list.iter() {
                info!("type:{}, server:{:?}", server.server_type, server.value());
            }
        }
        info!("{}", "#####################################DUMP SERVERS END###################################");
    }

    pub async fn find_server_list(&self) -> Result<DashMap<String, Vec<Server>>, DiscoverError> {
        let options = Some(GetOptions::new().with_prefix());
        let prefix = self.config.server_prefix.clone();
        let mut client = self.client.clone();
        let resp = client.get(prefix, options).await;
        match resp {
            Ok(res) => {
                let list: DashMap<String, Vec<Server>> = DashMap::new();
                for kv in res.kvs().iter() {
                    let data = kv.value_str().unwrap().to_owned();
                    match reg_server_deserialize(&data) {
                        Ok(server) => {
                            match list.get_mut(&server.server_type) {
                                Some(mut servers) => {
                                    servers.push(server);
                                }
                                None => {
                                    let server_type = server.server_type.clone();
                                    let mut servers = Vec::new();
                                    servers.push(server);
                                    list.insert(server_type, servers);
                                }
                            };
                        }
                        Err(err) => {
                            error!("EtcdDiscoverer FindServerList err:{}", err);
                        }
                    };
                }
                Ok(list)
            }
            Err(err) => Err(DiscoverError::EtcdGetError(err.to_string())),
        }
    }
    fn data_key(&self, key: &str) -> String {
        format!("{}{}", self.config.data_prefix, key)
    }
}

#[async_trait]
impl IDiscoverer for EtcdDiscoverer {
    async fn get_server_steam(&self) -> Result<WatchStream, DiscoverError> {
        let mut client = Client::connect(&self.endpoints, self.config.options.to_owned())
            .await
            .unwrap();
        let key = self.config.server_prefix.clone().into_bytes();
        let options = Some(WatchOptions::new().with_prefix());
        let resp = client.watch(key, options).await;
        match resp {
            Ok((_, stream)) => Ok(stream),
            Err(err) => Err(DiscoverError::EtcdWatchError(err.to_string())),
        }
    }
    async fn deal_server_stream(&self, resp: WatchResponse) {
        let mut dump = true;
        for ev in resp.events() {
            let event_type = ev.event_type();
            match event_type {
                EventType::Put => {
                    if let Some(kv) = ev.kv() {
                        match reg_server_deserialize(kv.value_str().unwrap()) {
                            Ok(server) => {
                                let type_server = server.clone();
                                let event_server = server.clone();
                                let server_type = type_server.server_type.to_owned();
                                //加入服务器列表
                                self.server_map.insert(server.server_id.to_owned(), server);
                                //加入服务器类型列表
                                if self.server_type_map.contains_key(&server_type) {
                                    if let Some(mut item) =
                                        self.server_type_map.get_mut(&server_type)
                                    {
                                        item.add(type_server);
                                    }
                                } else {
                                    let mut item = ServerTypeItem::new();
                                    item.add(type_server);
                                    self.server_type_map.insert(server_type, item);
                                }
                                //触发注册事件
                                self.apply_server_event_handlers(&EventType::Put, &event_server)
                                    .await;
                                //是否打印日志
                                if let Some(slient) = event_server.silent {
                                    if slient == 1 {
                                        dump = false;
                                    }
                                }
                            }
                            Err(err) => {
                                error!("reg_server_deserialize err:{}", err);
                            }
                        }
                    }
                }
                EventType::Delete => {
                    if let Some(kv) = ev.kv() {
                        if let Ok(ks) = kv.key_str() {
                            let arr: Vec<&str> = ks.split("/").collect();
                            if arr.len() > 2 {
                                let (server_type, server_id) =
                                    (arr[arr.len() - 2], arr[arr.len() - 1]);
                                //移除服务器列表
                                self.server_map.remove(server_id);
                                //移除服务器类型列表
                                let mut server_type_remove = false;
                                if self.server_type_map.contains_key(server_type) {
                                    if let Some(mut item) =
                                        self.server_type_map.get_mut(&server_type.to_owned())
                                    {
                                        let server = Server {
                                            server_type: server_type.to_owned(),
                                            server_id: server_id.to_owned(),
                                            ..Default::default()
                                        };
                                        let event_server = server.clone();
                                        if item.remove(server) {
                                            if item.list.is_empty() {
                                                server_type_remove = true;
                                            }
                                            //移除事件触发
                                            self.apply_server_event_handlers(
                                                &EventType::Delete,
                                                &event_server,
                                            )
                                            .await;
                                        }
                                    }
                                }
                                if server_type_remove {
                                    self.server_type_map.remove(&server_type.to_owned());
                                }
                            }
                        }
                    }
                }
            }
        }
        if dump {
            self.dump_servers();
        }
    }

    async fn get_data_steam(&self) -> Result<WatchStream, DiscoverError> {
        let mut client = Client::connect(&self.endpoints, self.config.options.to_owned())
            .await
            .unwrap();
        let key = self.config.data_prefix.clone().into_bytes();
        let options = Some(WatchOptions::new().with_prefix());
        let resp = client.watch(key, options).await;
        match resp {
            Ok((_, stream)) => Ok(stream),
            Err(err) => Err(DiscoverError::EtcdWatchError(err.to_string())),
        }
    }
    async fn deal_data_stream(&self, resp: WatchResponse) {
        for ev in resp.events() {
            let event_type = ev.event_type();
            match event_type {
                EventType::Put => {
                    if let Some(kv) = ev.kv() {
                        self.apply_data_event_handlers(kv);
                    }
                }
                EventType::Delete => {}
            }
        }
    }
    async fn init_server_list(&self) {
        match self.find_server_list().await {
            Ok(list) => {
                for (k, v) in list {
                    match self.server_type_map.get(&k) {
                        Some(_) => {}
                        None => {
                            let mut item = ServerTypeItem::new();
                            for vv in v.iter() {
                                self.server_map.insert(vv.server_id.clone(), vv.to_owned());
                                item.add(vv.to_owned());
                            }
                            self.server_type_map.insert(k, item);
                        }
                    }
                }
            }
            Err(err) => {
                error!("init server list err:{}", err);
            }
        };
    }

    async fn register(&mut self, server: &Server) -> Result<(), DiscoverError> {
        let key = self.server_key(server);
        let value = reg_server_serialize(server);
        let resp = self.client.put(key, value, None).await;
        match resp {
            Ok(_) => Ok(()),
            Err(err) => Err(DiscoverError::EtcdConnectError(err.to_string())),
        }
    }

    async fn un_register(&mut self, server: &Server) -> Result<(), DiscoverError> {
        let key = self.server_key(server);
        let resp = self.client.delete(key, None).await;
        match resp {
            Ok(_) => Ok(()),
            Err(err) => Err(DiscoverError::EtcdDisconnectError(err.to_string())),
        }
    }

    fn get_server_list(
        &self,
        options: Option<Vec<FilterOption>>,
    ) -> Option<DashMap<String, Server>> {
        let filter = Filter::new(options);
        let server_map: DashMap<String, Server> = DashMap::new();
        for (k, v) in self.server_map.to_owned() {
            if filter.apply(&v) {
                server_map.insert(k, v);
            }
        }
        if server_map.is_empty() {
            return None;
        }
        Some(server_map)
    }

    fn get_server_by_id(
        &self,
        server_id: &str,
        options: Option<Vec<FilterOption>>,
    ) -> Option<Server> {
        let filter = Filter::new(options);
        for srv in self.server_map.iter() {
            if srv.server_id == server_id.to_owned() {
                if filter.apply(&srv) {
                    return Some(srv.to_owned());
                }
            }
        }
        None
    }

    fn get_server_by_type(
        &self,
        server_type: &str,
        server_arg: &str,
        options: Option<Vec<FilterOption>>,
    ) -> Option<Server> {
        if self.server_type_map.contains_key(server_type) {
            let type_map = self.server_type_map.get(server_type).unwrap();
            let type_count = type_map.list.len();
            if type_count == 0 {
                return None;
            }
            let mut srvs = Vec::new();
            let filter = Filter::new(options);
            for srv in type_map.list.iter() {
                if filter.apply(&srv) {
                    srvs.push(srv.value().server_id.to_owned());
                }
            }
            if srvs.len() == type_count {
                let choice = type_map.hasher.get_str(&server_arg).unwrap();
                return Some(choice.data());
            }
            let mut hasher = ConsistentHash::new();
            for sid in srvs.iter() {
                let node = HashNode::new(sid);
                hasher.add(&node, 20);
            }
            let choice = hasher.get_str(&server_arg).unwrap();
            let server = type_map.list.get(choice.data()).unwrap().clone();
            return Some(server);
        }
        None
    }

    fn get_server_by_type_load(
        &self,
        server_type: &str,
        options: Option<Vec<FilterOption>>,
    ) -> Option<Server> {
        if self.server_type_map.contains_key(server_type) {
            let type_map = self.server_type_map.get(server_type).unwrap();
            let type_count = type_map.list.len();
            if type_count == 0 {
                return None;
            }
            let mut server: Option<Server> = None;
            let filter = Filter::new(options);
            for srv in type_map.list.iter() {
                if !filter.apply(&srv) {
                    continue;
                }
                if server.is_none() {
                    server = Some(srv.to_owned());
                } else if let Some(res) = &server {
                    if res.load > srv.load {
                        server = Some(res.to_owned());
                    }
                }
            }
            return server;
        }
        None
    }

    fn get_server_type_list(
        &self,
        server_type: &str,
        options: Option<Vec<FilterOption>>,
    ) -> Option<DashMap<String, Server>> {
        if self.server_type_map.contains_key(server_type) {
            let type_map = self.server_type_map.get(server_type).unwrap();
            let type_count = type_map.list.len();
            if type_count == 0 {
                return None;
            }
            let server_map = DashMap::new();
            let filter = Filter::new(options);
            for (server_id, server) in type_map.list.to_owned() {
                if filter.apply(&server) {
                    server_map.insert(server_id, server);
                }
            }
            return Some(server_map);
        }
        None
    }

    async fn register_server_event_handlers(&mut self, handlers: &mut Vec<ServerHander>) {
        self.server_event_handlers.append(handlers);
    }

    async fn apply_server_event_handlers(&self, event_type: &EventType, server: &Server) {
        for handler in &self.server_event_handlers {
            handler.server_event_handle(event_type, server);
        }
    }

    fn register_data_event_handlers(&mut self, handlers: &mut Vec<DataHander>) {
        self.data_event_handlers.append(handlers);
    }

    fn apply_data_event_handlers(&self, kv: &KeyValue) {
        for handler in &self.data_event_handlers {
            handler.data_event_handle(kv);
        }
    }

    async fn incre_load(
        &mut self,
        server_id: &str,
        load: i64,
        options: Option<Vec<FilterOption>>,
    ) -> Option<DiscoverError> {
        let server = self.get_server_by_id(server_id, options);
        if server.is_none() {
            return Some(DiscoverError::ServerNotFound(server_id.into()));
        }
        let mut data = server.unwrap();
        let mut next_load = load;
        if let Some(ld) = data.load {
            next_load += ld;
        }
        data.load = Some(next_load);
        data.silent = Some(1);
        let resp = self.register(&data).await;
        match resp {
            Err(err) => Some(err),
            _ => None,
        }
    }

    async fn decre_load(
        &mut self,
        server_id: &str,
        load: i64,
        options: Option<Vec<FilterOption>>,
    ) -> Option<DiscoverError> {
        let server = self.get_server_by_id(server_id, options);
        if server.is_none() {
            return Some(DiscoverError::ServerNotFound(server_id.into()));
        }
        let mut data = server.unwrap();
        let mut next_load = -load;
        if let Some(ld) = data.load {
            next_load += ld;
        }
        data.load = Some(next_load);
        data.silent = Some(1);
        let resp = self.register(&data).await;
        match resp {
            Err(err) => Some(err),
            _ => None,
        }
    }

    async fn put_data(&mut self, key: String, value: String) -> Option<DiscoverError> {
        let data_key = self.data_key(&key);
        let resp = self.client.put(data_key, value, None).await;
        match resp {
            Err(err) => {
                let etcd_err = DiscoverError::EtcdPutError(err.to_string());
                Some(etcd_err)
            }
            _ => None,
        }
    }

    async fn remove_data(&mut self, key: &str) -> Option<DiscoverError> {
        let data_key = self.data_key(&key);
        let resp = self.client.delete(data_key, None).await;
        match resp {
            Err(err) => {
                let etcd_err = DiscoverError::EtcdRemoveError(err.to_string());
                Some(etcd_err)
            }
            _ => None,
        }
    }

    async fn get_data(&mut self, key: &str) -> Result<String, DiscoverError> {
        let data_key = self.data_key(&key);
        let resp = self.client.get(data_key, None).await;
        match resp {
            Err(err) => {
                let etcd_err = DiscoverError::EtcdGetError(err.to_string());
                Err(etcd_err)
            }
            Ok(res) => match res.kvs().first() {
                Some(kv) => Ok(kv.value_str().unwrap().into()),
                None => {
                    let etcd_err = DiscoverError::EtcdGetError("not found".into());
                    Err(etcd_err)
                }
            },
        }
    }
}

#[derive(Debug)]
pub struct EtcdOptions {
    server_prefix: String,
    data_prefix: String,
    timeout: Option<Duration>,
    options: Option<ConnectOptions>,
}

impl EtcdOptions {
    pub fn new() -> Self {
        Self {
            server_prefix: DEFAULT_SERVER_PREFIX.to_owned(),
            data_prefix: DEFAULT_DATA_PREFIX.to_owned(),
            timeout: None,
            options: None,
        }
    }
    pub fn with_server_prefix(mut self, server_prefix: String) -> Self {
        self.server_prefix = format!("/{}/", server_prefix);
        self
    }
    pub fn with_data_prefix(mut self, data_prefix: String) -> Self {
        self.data_prefix = format!("/{}/", data_prefix);
        self
    }
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = Some(timeout);
        self
    }
    pub fn with_options(mut self, options: ConnectOptions) -> Self {
        self.options = Some(options);
        self
    }

    pub fn build(self) -> Option<Self> {
        Some(self)
    }
}
